Skip to content

PydanticAI 核心概念

本章概览

本章将深入讲解 PydanticAI 的核心概念,帮助你掌握:

  • Agent 的完整配置和使用
  • Tools 的定义和最佳实践
  • Dependencies 依赖注入系统
  • Output 输出类型和验证
  • Messages 消息管理
  • Streaming 流式处理

1. Agent 详解

1.1 Agent 创建

Agent 是 PydanticAI 的核心,创建方式非常灵活:

python
from pydantic_ai import Agent
from pydantic import BaseModel

# 最简单的创建方式
agent = Agent('openai:gpt-4o')

# 带系统指令
agent = Agent(
    'anthropic:claude-sonnet-4-0',
    instructions='你是一个友好的助手,使用中文回答问题。'
)

# 完整配置
class OutputFormat(BaseModel):
    answer: str
    confidence: float

agent = Agent(
    'openai:gpt-4o',
    instructions='分析用户问题并给出答案和置信度',
    output_type=OutputFormat,
    model_settings={'temperature': 0.7, 'max_tokens': 1000},
)

1.2 Agent 参数详解

python
Agent(
    model: str | Model,              # 必需:模型标识
    *,
    # 输出配置
    output_type: type[T] = str,      # 输出类型(默认字符串)

    # 依赖配置
    deps_type: type[D] = None,       # 依赖类型

    # 指令配置
    instructions: str = None,         # 静态指令(推荐)
    system_prompt: str = None,        # 系统提示(旧版兼容)

    # 工具配置
    tools: list[Tool] = None,         # 工具列表
    mcp_servers: list[MCPServer] = None,  # MCP 服务器

    # 模型配置
    model_settings: ModelSettings = None,  # 模型参数

    # 验证配置
    output_validators: list[Callable] = None,  # 输出验证器

    # 历史处理
    history_processors: list[Callable] = None,  # 消息历史处理器

    # 其他
    name: str = None,                 # Agent 名称(用于追踪)
    retries: int = 1,                 # 默认重试次数
)

1.3 动态指令

除了静态指令,还可以使用动态指令:

python
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass

@dataclass
class UserContext:
    name: str
    language: str

agent = Agent('openai:gpt-4o', deps_type=UserContext)

# 动态系统指令
@agent.instructions
def get_instructions(ctx: RunContext[UserContext]) -> str:
    return f"""
    你正在与 {ctx.deps.name} 交谈。
    请使用 {ctx.deps.language} 回答问题。
    """

# 运行时注入上下文
result = agent.run_sync(
    '你好!',
    deps=UserContext(name='张三', language='中文')
)

1.4 多种运行方式

python
# 1. 同步运行
result = agent.run_sync('Hello')
print(result.output)

# 2. 异步运行
import asyncio

async def main():
    result = await agent.run('Hello')
    print(result.output)

asyncio.run(main())

# 3. 流式文本
async def stream_example():
    async with agent.run_stream('Tell me a story') as stream:
        async for text in stream.stream_text():
            print(text, end='', flush=True)

# 4. 流式增量
async def stream_delta():
    async with agent.run_stream('Tell me a story') as stream:
        async for delta in stream.stream_text(delta=True):
            print(delta, end='', flush=True)

# 5. 流式事件
async def stream_events():
    async with agent.run_stream_events('Hello') as stream:
        async for event in stream:
            print(f"Event: {event.kind}")

2. Tools 工具系统

2.1 工具定义方式

python
from pydantic_ai import Agent, RunContext, Tool

agent = Agent('openai:gpt-4o', deps_type=str)

# 方式 1: @agent.tool - 需要上下文
@agent.tool
async def greet_user(ctx: RunContext[str]) -> str:
    """
    向用户打招呼

    Returns:
        个性化的问候语
    """
    return f"你好,{ctx.deps}!"

# 方式 2: @agent.tool_plain - 不需要上下文
@agent.tool_plain
def add_numbers(a: int, b: int) -> int:
    """
    计算两个数字的和

    Args:
        a: 第一个数字
        b: 第二个数字

    Returns:
        两数之和
    """
    return a + b

# 方式 3: 手动创建 Tool 对象
def multiply(x: float, y: float) -> float:
    """将两个数相乘"""
    return x * y

manual_tool = Tool(multiply, takes_ctx=False)

2.2 工具参数类型

PydanticAI 支持丰富的参数类型:

python
from typing import Literal, Optional
from pydantic import BaseModel, Field
from datetime import datetime

# 简单类型
@agent.tool_plain
def simple_tool(
    text: str,           # 字符串
    count: int,          # 整数
    ratio: float,        # 浮点数
    enabled: bool,       # 布尔值
) -> str:
    """简单类型示例"""
    return f"{text} x {count}"

# 可选参数
@agent.tool_plain
def optional_tool(
    required: str,
    optional: str = "default",      # 有默认值的可选参数
    nullable: Optional[str] = None,  # 可为 None
) -> str:
    """可选参数示例"""
    return required

# 枚举类型
@agent.tool_plain
def enum_tool(
    action: Literal["create", "update", "delete"],  # 字面量类型
    priority: Literal[1, 2, 3] = 2,
) -> str:
    """枚举类型示例"""
    return f"Action: {action}, Priority: {priority}"

# 复杂类型
class SearchParams(BaseModel):
    query: str = Field(description="搜索关键词")
    limit: int = Field(default=10, ge=1, le=100)
    filters: dict[str, str] = Field(default_factory=dict)

@agent.tool_plain
def complex_tool(params: SearchParams) -> list[dict]:
    """复杂类型示例"""
    return [{"query": params.query}]

2.3 工具文档

PydanticAI 从 docstring 提取工具描述:

python
# 支持 Google 风格
@agent.tool_plain
def google_style(
    name: str,
    age: int,
) -> str:
    """
    处理用户信息

    Args:
        name: 用户姓名
        age: 用户年龄

    Returns:
        处理结果字符串
    """
    return f"{name} is {age} years old"

# 支持 NumPy 风格
@agent.tool_plain
def numpy_style(name: str, age: int) -> str:
    """
    处理用户信息

    Parameters
    ----------
    name : str
        用户姓名
    age : int
        用户年龄

    Returns
    -------
    str
        处理结果字符串
    """
    return f"{name} is {age} years old"

2.4 工具重试

当工具需要 LLM 重新调用时:

python
from pydantic_ai import ModelRetry

@agent.tool_plain
def validate_email(email: str) -> str:
    """验证邮箱格式"""
    if "@" not in email:
        # 触发重试,让 LLM 提供正确格式
        raise ModelRetry("无效的邮箱格式,请提供包含 @ 的有效邮箱")
    return f"邮箱 {email} 格式正确"

2.5 异步工具

python
import httpx

@agent.tool
async def fetch_weather(ctx: RunContext, city: str) -> dict:
    """
    获取城市天气

    Args:
        city: 城市名称
    """
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://api.weather.com/{city}"
        )
        return response.json()

3. Dependencies 依赖系统

3.1 定义依赖

python
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext

# 使用 dataclass 定义依赖
@dataclass
class AppDependencies:
    database: DatabaseConnection
    cache: CacheClient
    current_user_id: int
    api_key: str

# 或使用 Pydantic BaseModel
from pydantic import BaseModel

class AppDeps(BaseModel):
    db_url: str
    user_id: int
    debug: bool = False

    class Config:
        arbitrary_types_allowed = True

3.2 使用依赖

python
agent = Agent(
    'openai:gpt-4o',
    deps_type=AppDependencies,
)

# 在工具中使用
@agent.tool
async def get_user_orders(ctx: RunContext[AppDependencies]) -> list[dict]:
    """获取用户订单"""
    # 通过 ctx.deps 访问依赖
    orders = await ctx.deps.database.query(
        "SELECT * FROM orders WHERE user_id = ?",
        ctx.deps.current_user_id
    )
    return orders

# 在动态指令中使用
@agent.instructions
def get_instructions(ctx: RunContext[AppDependencies]) -> str:
    return f"当前用户 ID: {ctx.deps.current_user_id}"

# 运行时传入
result = agent.run_sync(
    "查看我的订单",
    deps=AppDependencies(
        database=db_conn,
        cache=redis_client,
        current_user_id=12345,
        api_key="secret",
    )
)

3.3 依赖覆盖(测试)

python
import pytest

@pytest.fixture
def mock_deps():
    """创建测试依赖"""
    return AppDependencies(
        database=MockDatabase(),
        cache=MockCache(),
        current_user_id=1,
        api_key="test-key",
    )

def test_agent_with_mock(mock_deps):
    # 使用 override 覆盖依赖
    with agent.override(deps=mock_deps):
        result = agent.run_sync("测试查询")
        assert result.output is not None

4. Output 输出系统

4.1 输出类型

python
from pydantic import BaseModel, Field
from typing import Union, Literal

# 基本 Pydantic 模型
class AnalysisResult(BaseModel):
    """分析结果"""
    sentiment: Literal["positive", "negative", "neutral"]
    confidence: float = Field(ge=0, le=1)
    keywords: list[str]
    summary: str

agent = Agent(
    'openai:gpt-4o',
    output_type=AnalysisResult,
)

result = agent.run_sync("分析这段文本的情感...")
# result.output 类型为 AnalysisResult
print(result.output.sentiment)
print(result.output.confidence)

4.2 联合类型输出

python
class SuccessResponse(BaseModel):
    status: Literal["success"] = "success"
    data: dict

class ErrorResponse(BaseModel):
    status: Literal["error"] = "error"
    error_code: int
    message: str

# 使用联合类型
agent = Agent(
    'openai:gpt-4o',
    output_type=Union[SuccessResponse, ErrorResponse],
)

result = agent.run_sync("执行操作")
if isinstance(result.output, SuccessResponse):
    print("成功:", result.output.data)
else:
    print("失败:", result.output.message)

4.3 输出验证器

python
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o', output_type=str)

@agent.output_validator
async def validate_output(ctx, output: str) -> str:
    """验证输出不包含敏感信息"""
    sensitive_words = ["password", "secret", "api_key"]
    for word in sensitive_words:
        if word.lower() in output.lower():
            raise ValueError(f"输出包含敏感词: {word}")
    return output

4.4 输出函数

python
from pydantic_ai import Agent, RunContext
from pydantic_ai.output import ToolOutput

# 输出函数作为输出类型
def format_response(answer: str, sources: list[str]) -> str:
    """
    格式化响应

    Args:
        answer: 回答内容
        sources: 引用来源
    """
    formatted_sources = "\n".join(f"- {s}" for s in sources)
    return f"{answer}\n\n来源:\n{formatted_sources}"

agent = Agent(
    'openai:gpt-4o',
    output_type=format_response,  # 函数作为输出类型
)

5. Messages 消息管理

5.1 消息历史

python
# 获取所有消息
result = agent.run_sync("你好")
all_messages = result.all_messages()
new_messages = result.new_messages()

# JSON 格式
messages_json = result.all_messages_json()

5.2 继续对话

python
# 第一轮对话
result1 = agent.run_sync("我叫张三")

# 继续对话,传入历史
result2 = agent.run_sync(
    "我叫什么名字?",
    message_history=result1.all_messages()
)

print(result2.output)  # 应该记得用户叫张三

5.3 消息持久化

python
from pydantic_ai.messages import ModelMessagesTypeAdapter
import json

# 保存消息
messages = result.all_messages()
json_str = ModelMessagesTypeAdapter.to_json(messages)

# 存储到文件或数据库
with open("conversation.json", "w") as f:
    f.write(json_str)

# 加载消息
with open("conversation.json", "r") as f:
    loaded_json = f.read()

loaded_messages = ModelMessagesTypeAdapter.validate_json(loaded_json)

# 继续对话
result = agent.run_sync(
    "继续我们的对话",
    message_history=loaded_messages
)

5.4 消息处理器

python
def truncate_history(messages, max_messages=10):
    """保留最近的消息"""
    if len(messages) <= max_messages:
        return messages
    # 保留系统消息和最近的用户/助手消息
    return messages[:1] + messages[-(max_messages-1):]

agent = Agent(
    'openai:gpt-4o',
    history_processors=[truncate_history],
)

6. Streaming 流式处理

6.1 文本流

python
async def stream_text_example():
    async with agent.run_stream("写一首诗") as stream:
        # 完整文本块
        async for chunk in stream.stream_text():
            print(chunk, end='')

        # 或者增量文本
        async for delta in stream.stream_text(delta=True):
            print(delta, end='', flush=True)

        # 获取最终结果
        result = await stream.get_result()
        print(f"\n\nTokens used: {result.usage.total_tokens}")

6.2 结构化数据流

python
from pydantic import BaseModel

class StoryPart(BaseModel):
    chapter: int
    title: str
    content: str

agent = Agent('openai:gpt-4o', output_type=list[StoryPart])

async def stream_structured():
    async with agent.run_stream("写一个三章的故事") as stream:
        async for partial in stream.stream_structured():
            # 部分验证的数据
            print(f"当前解析: {partial}")

        result = await stream.get_result()
        for part in result.output:
            print(f"第 {part.chapter} 章: {part.title}")

6.3 事件流

python
from pydantic_ai.events import (
    ModelRequestEvent,
    ModelResponseEvent,
    ToolCallEvent,
    ToolReturnEvent,
)

async def stream_events_example():
    async with agent.run_stream_events("使用工具完成任务") as stream:
        async for event in stream:
            match event:
                case ModelRequestEvent():
                    print("📤 发送请求到模型")
                case ModelResponseEvent():
                    print("📥 收到模型响应")
                case ToolCallEvent(tool_name=name):
                    print(f"🔧 调用工具: {name}")
                case ToolReturnEvent(tool_name=name):
                    print(f"✅ 工具返回: {name}")

7. 高级特性

7.1 Usage Limits

python
from pydantic_ai import UsageLimits

# 设置使用限制
limits = UsageLimits(
    request_limit=5,           # 最多 5 次请求
    response_tokens_limit=500, # 响应最多 500 token
    total_tokens_limit=1000,   # 总共最多 1000 token
    tool_calls_limit=3,        # 最多调用 3 次工具
)

result = agent.run_sync(
    "复杂任务",
    usage_limits=limits,
)

# 检查使用情况
print(f"请求次数: {result.usage.requests}")
print(f"总 tokens: {result.usage.total_tokens}")

7.2 Model Settings

python
from pydantic_ai import ModelSettings

settings = ModelSettings(
    temperature=0.7,           # 创造性
    max_tokens=2000,           # 最大输出
    top_p=0.9,                 # nucleus 采样
    seed=42,                   # 随机种子(可复现)
)

agent = Agent(
    'openai:gpt-4o',
    model_settings=settings,
)

# 运行时覆盖
result = agent.run_sync(
    "创意写作",
    model_settings=ModelSettings(temperature=0.9),
)

7.3 Retries 重试

python
# Agent 级别默认重试
agent = Agent('openai:gpt-4o', retries=3)

# 运行时指定
result = agent.run_sync("任务", retries=5)

7.4 Model Override

python
# 运行时使用不同模型
agent = Agent('openai:gpt-4o')

# 使用更便宜的模型
result = agent.run_sync(
    "简单任务",
    model='openai:gpt-4o-mini',
)

# 使用更强大的模型
result = agent.run_sync(
    "复杂任务",
    model='anthropic:claude-sonnet-4-0',
)

8. MCP 集成

8.1 连接 MCP 服务器

PydanticAI MCP 文档图:PydanticAI MCP 集成文档

python
from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServer

# 连接本地 MCP 服务器
mcp = MCPServer(
    "uvx mcp-server-fetch",  # MCP 服务器命令
    transport="stdio",
)

agent = Agent(
    'openai:gpt-4o',
    mcp_servers=[mcp],
)

# Agent 自动获得 MCP 服务器提供的工具
result = agent.run_sync("使用 fetch 工具获取网页内容")

8.2 使用 FastMCP

python
from pydantic_ai.mcp import FastMCPToolset

# 使用 FastMCP 客户端
toolset = FastMCPToolset(
    "http://localhost:8000",
    transport="http",
)

agent = Agent(
    'openai:gpt-4o',
    tools=toolset.get_tools(),
)

9. 小结

本章介绍了 PydanticAI 的核心概念:

概念作用关键点
Agent核心控制器泛型设计、多种运行方式
ToolsLLM 能力扩展装饰器定义、自动 Schema
Dependencies运行时注入类型安全、便于测试
Output结构化输出Pydantic 验证、自动重试
Messages对话管理历史持久化、继续对话
Streaming实时输出文本流、结构化流

这些概念共同构成了 PydanticAI 的核心能力,使其成为一个强大而易用的 Agent 框架。


上一章:架构设计下一章:与 LangGraph 对比

基于 MIT 许可证发布。内容版权归作者所有。